package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunctionNew1;
import com.atguigu.gmall.realtime.bean.OrderWide;
import com.atguigu.gmall.realtime.bean.PaymentWide;
import com.atguigu.gmall.realtime.bean.ProductStats;
import com.atguigu.gmall.realtime.common.GmallConfig;
import com.atguigu.gmall.realtime.common.GmallConstant;
import com.atguigu.gmall.realtime.utils.ClickHouseUtil;
import com.atguigu.gmall.realtime.utils.DateTimeUtil;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

import org.apache.flink.table.api.Table;


/**
 * Author: Felix
 将各种信息按照给定的格式按SKU_ID 再一段时间内进行各自的数据的个数的汇总
 最后按维度表进行关联聚合。

 */
public class ProductStatsAppSqlNew {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //创建tableEnv
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        /*
        //1.3 检查点CK相关设置
        env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        StateBackend fsStateBackend = new FsStateBackend(
                "hdfs://hadoop202:8020/gmall/flink/checkpoint/ProductStatsApp");
        env.setStateBackend(fsStateBackend);
        System.setProperty("HADOOP_USER_NAME","atguigu");
        */

        //TODO 2.从Kafka中获取数据流
        //2.1 声明相关的主题名称以及消费者组
        String groupId = "product_stats_app";
        String pageViewSourceTopic = "dwd_page_log";
        String favorInfoSourceTopic = "dwd_favor_info";
        String cartInfoSourceTopic = "dwd_cart_info";
        String refundInfoSourceTopic = "dwd_order_refund_info";
        String commentInfoSourceTopic = "dwd_comment_info";


        String orderWideSourceTopic = "dwm_order_wide";
        String paymentWideSourceTopic = "dwm_payment_wide";


        //2.2 从页面日志中获取点击和曝光数据
        FlinkKafkaConsumer<String> pageViewSource = MyKafkaUtil.getKafkaSource(pageViewSourceTopic, groupId);
        DataStreamSource<String> pageViewDStream = env.addSource(pageViewSource);

        //2.3 从dwd_favor_info中获取收藏数据
        FlinkKafkaConsumer<String> favorInfoSourceSouce = MyKafkaUtil.getKafkaSource(favorInfoSourceTopic, groupId);
        DataStreamSource<String> favorInfoDStream = env.addSource(favorInfoSourceSouce);

        //2.4 从dwd_cart_info中获取购物车数据
        FlinkKafkaConsumer<String> cartInfoSource = MyKafkaUtil.getKafkaSource(cartInfoSourceTopic, groupId);
        DataStreamSource<String> cartInfoDStream = env.addSource(cartInfoSource);

        //2.5 从dwm_order_wide中获取订单数据
        FlinkKafkaConsumer<String> orderWideSource = MyKafkaUtil.getKafkaSource(orderWideSourceTopic, groupId);
        DataStreamSource<String> orderWideDStream = env.addSource(orderWideSource);

        //2.6 从dwm_payment_wide中获取支付数据
        FlinkKafkaConsumer<String> paymentWideSource = MyKafkaUtil.getKafkaSource(paymentWideSourceTopic, groupId);
        DataStreamSource<String> paymentWideDStream = env.addSource(paymentWideSource);

        //2.7 从dwd_order_refund_info中获取退款数据
        FlinkKafkaConsumer<String> refundInfoSource = MyKafkaUtil.getKafkaSource(refundInfoSourceTopic, groupId);
        DataStreamSource<String> refundInfoDStream = env.addSource(refundInfoSource);

        //2.8 从dwd_order_refund_info中获取评价数据
        FlinkKafkaConsumer<String> commentInfoSource = MyKafkaUtil.getKafkaSource(commentInfoSourceTopic, groupId);
        DataStreamSource<String> commentInfoDStream = env.addSource(commentInfoSource);

        //可能对应的column
//        String column=  "stt,edt, sku_id, sku_name ,sku_price,spu_id,spu_name ,tm_id,tm_name, category3_id,category3_name, display_ct,click_ct,favor_ct ,cart_ct ,order_sku_num,order_amount,order_ct,payment_amount, paid_order_ct,refund_order_ct, refund_amount ,comment_ct,good_comment_ct,ts,ts.rowtime as rowtime  ";
        //TODO 3.将各个流的数据转换为统一的对象格式
        //3.1 对点击和曝光数据进行转换      jsonStr-->ProduceStats
        SingleOutputStreamOperator<ProductStats> productClickAndDispalyDS = pageViewDStream.process(
                new ProcessFunction<String, ProductStats>() {
                    @Override
                    public void processElement(String jsonStr, Context ctx, Collector<ProductStats> out) throws Exception {
                        //将json格式字符串转换为json对象
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        JSONObject pageJsonObj = jsonObj.getJSONObject("page");
                        String pageId = pageJsonObj.getString("page_id");
                        if (pageId == null) {
                            System.out.println(">>>>" + jsonObj);
                        }
                        //获取操作时间
                        Long ts = jsonObj.getLong("ts");
                        //如果当前访问的页面是商品详情页，认为该商品被点击了一次
                        if ("good_detail".equals(pageId)) {
                            //获取被点击商品的id
                            Long skuId = pageJsonObj.getLong("item");
                            //封装一次点击操作
                            ProductStats productStats = ProductStats.builder().sku_id(skuId).click_ct(1L).ts(ts).build();
                            //向下游输出
                            out.collect(productStats);
                        }

                        JSONArray displays = jsonObj.getJSONArray("displays");
                        //如果displays属性不为空，那么说明有曝光数据
                        if (displays != null && displays.size() > 0) {
                            for (int i = 0; i < displays.size(); i++) {
                                //获取曝光数据
                                JSONObject displayJsonObj = displays.getJSONObject(i);
                                //判断是否曝光的某一个商品
                                if ("sku_id".equals(displayJsonObj.getString("item_type"))) {
                                    //获取商品id
                                    Long skuId = displayJsonObj.getLong("item");
                                    //封装曝光商品对象
                                    ProductStats productStats = ProductStats.builder().sku_id(skuId).display_ct(1L).ts(ts).build();
                                    //向下游输出
                                    out.collect(productStats);
                                }
                            }
                        }

                    }
                }
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return productStats.getTs();
            }
        })   ;

        //将流转化为表。将来进行操作
    // String column=  "stt,edt, sku_id, sku_name ,sku_price,spu_id,spu_name ,tm_id,tm_name, category3_id,category3_name, display_ct,click_ct,favor_ct ,cart_ct ,order_sku_num,order_amount,order_ct,payment_amount, paid_order_ct,refund_order_ct, refund_amount ,comment_ct,good_comment_ct,ts.rowtime  ";

       Table productClickAndDispalyTable =  tableEnv.fromDataStream(productClickAndDispalyDS, Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP(3)").build() );
//        Table productClickAndDispalyTable =  tableEnv.fromDataStream(productClickAndDispalyDS,, "stt","edt","sku_id","sku_name","sku_price","spu_id","spu_name","tm_id","tm_name","category3_id","category3_name","display_ct","click_ct","favor_ct","cart_ct","order_sku_num","order_amount","order_ct","payment_amount","paid_order_ct","refund_order_ct","refund_amount","comment_ct","good_comment_ct","ts","proctime.proctime","ts.rowtime");

        TableSchema schema = productClickAndDispalyTable.getSchema();
        String s = schema.toString();
        System.out.println("#########################################");
        System.out.println( s);

        tableEnv.createTemporaryView("click", productClickAndDispalyTable);


        //统计点击率 曝光率

        StringBuffer Sqlclick = new StringBuffer();
        Sqlclick.append("select \n" +
                "            DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(display_ct) as display_ct ,\n" +
                "            sum(click_ct) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  click \n" +
                "             group by  TUMBLE(rowtime, INTERVAL '10' SECOND ),sku_id");

        //点击率统计最后结果
        Table clickresulttable = tableEnv.sqlQuery(Sqlclick.toString());
        DataStream<ProductStats> StreamClick = tableEnv.toAppendStream(clickresulttable, ProductStats.class);


        //3.2 对订单宽表进行转换      jsonStr-->ProductStats
        SingleOutputStreamOperator<ProductStats> orderWideStatsDS = orderWideDStream.map(
                new MapFunction<String, ProductStats>() {
                    @Override
                    public ProductStats map(String jsonStr) throws Exception {
                        //将json字符串转换为对应的订单宽表对象
                        OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);
                        String create_time = orderWide.getCreate_time();
                        //将字符串日期转换为毫秒数
                        Long ts = DateTimeUtil.toTs(create_time);
                        ProductStats productStats = ProductStats.builder()
                                .sku_id(orderWide.getSku_id())
                                .order_sku_num(orderWide.getSku_num())
                                .order_amount(orderWide.getSplit_total_amount())
                                .ts(ts)
                                .order_ct( new Long(new HashSet(Collections.singleton(orderWide.getOrder_id())).size()))
                                .build();
                        return productStats;
                    }
                }
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return productStats.getTs();
            }
        });
        //将流转化为表。将来进行操作,定义为订单宽表
        //  tEnv.registerDataStream("students", dataStreamSource, "name, sex, age, score, createTime, processingTime.proctime"
        Table orderWideStatsDSTable = tableEnv.fromDataStream(orderWideStatsDS, Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );

        tableEnv.createTemporaryView("orderwide", orderWideStatsDSTable);
        //统计下单数，下单总金额
        StringBuffer Sqlorderwide = new StringBuffer();
        Sqlorderwide.append("select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(order_sku_num) as order_sku_num ,\n" +
                "            sum(order_amount) as order_amount ,\n" +
                "            sum(order_ct) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  orderwide \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id");
        //统计结果
        Table Sqlorderwidetable = tableEnv.sqlQuery(Sqlorderwide.toString());
        DataStream<ProductStats> StreamOrderWide = tableEnv.toAppendStream(Sqlorderwidetable, ProductStats.class);




        //3.3转换收藏流数据
        SingleOutputStreamOperator<ProductStats> favorStatsDS = favorInfoDStream.map(
                new MapFunction<String, ProductStats>() {
                    @Override
                    public ProductStats map(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        //将字符串日期转换为毫秒数
                        Long ts = DateTimeUtil.toTs(jsonObj.getString("create_time"));
                        ProductStats productStats = ProductStats.builder()
                                .sku_id(jsonObj.getLong("sku_id"))
                                .favor_ct(1L)
                                .ts(ts)
                                .build();
                        return productStats;
                    }
                }
        ) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return  productStats.getTs();
            }
        });
        Table favorStatsDSTable =  tableEnv.fromDataStream(favorStatsDS, Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );
        tableEnv.createTemporaryView("favor", favorStatsDSTable);



        //统计收藏流的数量
        StringBuffer Sqlfavor = new StringBuffer() ;
        Sqlfavor.append("select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(favor_ct) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  favor \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id");

        //统计结果
        Table SqlfavorTable = tableEnv.sqlQuery(Sqlfavor.toString());
        DataStream<ProductStats> StreamFavor= tableEnv.toAppendStream(SqlfavorTable, ProductStats.class);


        //3.4转换购物车流数据
        SingleOutputStreamOperator<ProductStats> cartStatsDS = cartInfoDStream.map(
                new MapFunction<String, ProductStats>() {
                    @Override
                    public ProductStats map(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        //将字符串日期转换为毫秒数
                        Long ts = DateTimeUtil.toTs(jsonObj.getString("create_time"));

                        ProductStats productStats = ProductStats.builder()
                                .sku_id(jsonObj.getLong("sku_id"))
                                .cart_ct(1L)
                                .ts(ts)
                                .build();
                        return productStats;
                    }
                }
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return  productStats.getTs();
            }
        });
        Table cartStatsDSTable =   tableEnv.fromDataStream(cartStatsDS , Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );
        tableEnv.createTemporaryView("cart", cartStatsDSTable);
        //统计购物车的数量
        StringBuffer Sqlcart = new StringBuffer() ;
        Sqlcart.append("      select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(cart_ct) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  cart \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id") ;

        Table SqlcartTable = tableEnv.sqlQuery(Sqlcart.toString());
        DataStream<ProductStats> StreamCart= tableEnv.toAppendStream(SqlcartTable, ProductStats.class);


        //3.5转换支付流数据
        SingleOutputStreamOperator<ProductStats> paymentStatsDS = paymentWideDStream.map(
                new MapFunction<String, ProductStats>() {
                    @Override
                    public ProductStats map(String jsonObj) throws Exception {
                        PaymentWide paymentWide = JSON.parseObject(jsonObj, PaymentWide.class);
                        Long ts = DateTimeUtil.toTs(paymentWide.getPayment_create_time());
                        return ProductStats.builder()
                                .sku_id(paymentWide.getSku_id())
                                .payment_amount(paymentWide.getSplit_total_amount())
                                .paid_order_ct( new Long(new HashSet(Collections.singleton(paymentWide.getOrder_id())).size()))
                                .ts(ts)
                                .build();
                    }
                }
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return productStats.getTs();
            }
        });

        Table paymentWideDStreamTable = tableEnv.fromDataStream(paymentStatsDS, Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );
        tableEnv.createTemporaryView("payment", paymentWideDStreamTable);

        //统计支付的个数及金额合计统计
        StringBuffer Sqlpay = new StringBuffer() ;
        Sqlpay.append("          select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(payment_amount) as payment_amount ,\n" +
                "            sum(paid_order_ct) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  payment \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id");


        Table Sqlpaytable = tableEnv.sqlQuery(Sqlpay.toString());
        DataStream<ProductStats> Streampayment= tableEnv.toAppendStream(Sqlpaytable, ProductStats.class);



        //3.6转换退款流数据
        SingleOutputStreamOperator<ProductStats> refundStatsDS= refundInfoDStream.map(
                jsonStr -> {
                    JSONObject refundJsonObj = JSON.parseObject(jsonStr);
                    Long ts = DateTimeUtil.toTs(refundJsonObj.getString("create_time"));
                    ProductStats productStats = ProductStats.builder()
                            .sku_id(refundJsonObj.getLong("sku_id"))
                            .refund_amount(refundJsonObj.getBigDecimal("refund_amount"))
                            .refund_order_ct( new Long( new HashSet(Collections.singleton(refundJsonObj.getLong("order_id"))).size() ))
                            .ts(ts)
                            .build();
                    return productStats;

                }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return productStats.getTs();
            }
        });

        Table refundStatsDSTable = tableEnv.fromDataStream(refundStatsDS , Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );
        tableEnv.createTemporaryView("refund", refundStatsDSTable);

        //统计退款的数量
        StringBuffer Sqltuikuang = new StringBuffer() ;
        Sqltuikuang.append("select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(refund_order_ct) as refund_order_ct ,\n" +
                "            sum(refund_amount) as refund_amount ,\n" +
                "            sum(0) as comment_ct ,\n" +
                "            sum(0) as good_comment_ct\n" +
                "             from  refund \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id");


        Table Sqltuiluangtable = tableEnv.sqlQuery(Sqltuikuang.toString());
        DataStream<ProductStats> Streamrefund= tableEnv.toAppendStream(Sqltuiluangtable, ProductStats.class);



        //3.7转换评价流数据
        SingleOutputStreamOperator<ProductStats> commonInfoStatsDS= commentInfoDStream.map(
                jsonStr -> {
                    JSONObject commonJsonObj = JSON.parseObject(jsonStr);
                    Long ts = DateTimeUtil.toTs(commonJsonObj.getString("create_time"));
                    Long goodCt = GmallConstant.APPRAISE_GOOD.equals(commonJsonObj.getString("appraise")) ? 1L : 0L;
                    ProductStats productStats = ProductStats.builder()
                            .sku_id(commonJsonObj.getLong("sku_id"))
                            .comment_ct(1L)
                            .good_comment_ct(goodCt)
                            .ts(ts)
                            .build();
                    return productStats;
                }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ProductStats>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(ProductStats productStats) {
                return productStats.getTs();
            }
        });


        Table commonInfoStatsDSTable =tableEnv.fromDataStream(commonInfoStatsDS , Schema.newBuilder().columnByMetadata("ts.rowtime", "TIMESTAMP(3)").build() );
        tableEnv.createTemporaryView("common", commonInfoStatsDSTable);
        //将所有的数据刘转化为对应的表 ，按表进行统计 ，最后再与维度表进行关联，再输出到CLICKHOUSE .
        //按题目要求将所有的要统计的项目进行统计归纳
        //所有表已经创建完毕，进行自己的挨个统计。

        StringBuffer SqlcommSql = new StringBuffer();
        SqlcommSql.append(" select \n" +
                "            DATE_FORMAT(TUMBLE_START( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            sku_id,\n" +
                "            sum(0) as display_ct ,\n" +
                "            sum(0) as click_ct ,\n" +
                "            sum(0) as favor_ct ,\n" +
                "            sum(0) as cart_ct ,\n" +
                "            sum(0) as order_sku_num ,\n" +
                "            sum(0) as order_amount ,\n" +
                "            sum(0) as order_ct ,\n" +
                "            sum(0) as payment_amount ,\n" +
                "            sum(0) as paid_order_ct ,\n" +
                "            sum(0) as refund_order_ct ,\n" +
                "            sum(0) as refund_amount ,\n" +
                "            sum(comment_ct) as comment_ct ,\n" +
                "            sum(good_comment_ct) as good_comment_ct\n" +
                "             from  common \n" +
                "             group by  TUMBLE( TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), INTERVAL '10' SECOND ),sku_id") ;

        //统计的结果转化为流
        Table SqlcommSqltable1 = tableEnv.sqlQuery(SqlcommSql.toString());
        DataStream<ProductStats> Streamcomment = tableEnv.toAppendStream(SqlcommSqltable1, ProductStats.class);


        ///////////////////////////////////////////////////////////////////////////////////////

        //将所有的数据进行汇总联合转化为流
        DataStream<ProductStats> resultLast = StreamClick.union(StreamCart, Streamcomment, StreamFavor, StreamOrderWide, Streampayment, Streamrefund);

        // 将对象转为json 对象 进行关联
        SingleOutputStreamOperator<JSONObject> mapALL = resultLast.map(new MapFunction<ProductStats, JSONObject>() {

            @Override
            public JSONObject map(ProductStats productStats) throws Exception {

                //将合并后的对象进行再一次的转化为json 对象为后面的方便合并做准备
                String jsonString = JSON.toJSONString(productStats);
                JSONObject jsonObject = JSON.parseObject(jsonString);

                return jsonObject;
            }
        });

        //TODO 9.补充商品的维度信息
        //9.1 关联商品维度

        SingleOutputStreamOperator<JSONObject> orderWideWithProvice = AsyncDataStream.unorderedWait(
                mapALL,
                new DimAsyncFunctionNew1(GmallConfig.DIM_SKU_INFO),
                60, TimeUnit.SECONDS
        );



        //9.2 关联SPU维度
        SingleOutputStreamOperator<JSONObject> dimSpuinfoRDD = AsyncDataStream.unorderedWait(
                orderWideWithProvice,
                new DimAsyncFunctionNew1(GmallConfig.DIM_SPU_INFO),
                60, TimeUnit.SECONDS
        );


        //9.3 关联品牌维度
        SingleOutputStreamOperator<JSONObject> jsonObjectSingleOutputStreamOperator = AsyncDataStream.unorderedWait(
                dimSpuinfoRDD,
                new DimAsyncFunctionNew1(GmallConfig.DIM_BASE_TRADEMARK),
                60, TimeUnit.SECONDS
        );



        //9.4 关联品类维度

        SingleOutputStreamOperator<JSONObject> jsonObjectSingleOutputStreamOperator1 = AsyncDataStream.unorderedWait(
                dimSpuinfoRDD,
                new DimAsyncFunctionNew1(GmallConfig.DIM_BASE_CATEGORY3),
                60, TimeUnit.SECONDS
        );
        //将json 对象转化为实体对象

        SingleOutputStreamOperator<ProductStats> productStatsWithCategoryDS = jsonObjectSingleOutputStreamOperator1.map(new MapFunction<JSONObject, ProductStats>() {
            @Override
            public ProductStats map(JSONObject jsonObject) throws Exception {
                ProductStats baseCategory1 = JSON.toJavaObject(jsonObject, ProductStats.class);

                return baseCategory1;
            }
        });

        productStatsWithCategoryDS.print(">>>>");

//        //TODO 10.将聚合后的流数据写到ClickHouse中
        productStatsWithCategoryDS.addSink(
                ClickHouseUtil
                        .<ProductStats>getJdbcSink("insert into product_stats_0820 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
                        ));
//
//        //TODO 11.将统计的结果写回到kafka的dws层
//        productStatsWithCategoryDS
//                .map(productStat->JSON.toJSONString(productStat,new SerializeConfig(true)))
//                .addSink(MyKafkaUtil.getKafkaSink("dws_product_stats"));

        env.execute();

    }
}